<?php
/**
 * autor:zhihong.yin
 * description:存量csv（ftp），下载,切割，入库
 * instuction: php命令+此文件+csv文件的ftp链接
 */
header('Content-Type: text/html; charset=utf-8');
error_reporting(E_ALL);
set_time_limit(0);
include_once dirname(dirname(dirname(dirname(dirname(__FILE__)))))."/common.php";
\ns_core\m_load::load("ns_core.m_config");
\ns_core\m_load::load("ns_core.m_split_file");
class incre_csv_import_db
{
    //ftp路径
    public $ftp_url;
    //ftp资源
    private $ftp_conn;

    //文件在fpt的路径
    private $ftp_file_path;

    //类型判断标识   'category' 栏目  |  'asset'  媒资
    private $flag_category_aset;

    //数据库对象
    private $db;

    //一次性读入内存的行数,不大于切割的文件的最大行数
    private $read_memory_rows = 1000;

    //媒资表字段
    private $asset_column_map = array (
        'assetid' => 'nns_assetid',
        'id' => 'nns_id',
        'category_id' => 'nns_category_id',
        '节目名称' => 'nns_name',
        '导演' => 'nns_director',
        '演员' => 'nns_actor',
        '海报' => 'nns_image',
        'category_type' => 'category_type',
        '集数' => 'nns_index',
        'COUNTRY_OF_ORIGIN' => 'nns_country',
        '地区' => 'nns_area',
        '年代' => 'nns_year',
        '类型' => 'nns_type',
    );

    //给出的csv字段顺序数组
    public $asset_column_order = array();

    /*****文件切割属性**********/

    //切割文件的最大行数
    private $split_file_rows = 10000;

    //要切割文件名
    private $file_name;
    //切割文件后缀
    private $file_extension;
    //切割后本地存储路径
    private $dir_store_split_file ;

    //切割后的文件前缀
    private $prefix_splited_file = 'splited_';

    //栏目数据字段
    private $asset_catogory_column = array(
        'nns_id',
        'nns_parent_id',
        'nns_name',
        'nns_deep',
        'nns_modify_time',
        'nns_extra_param_one',
        'nns_extra_param_two'
    );

    public function __construct()
    {
        if( strtolower(PHP_OS) != 'linux')
        {
            $this->returnMsg('该脚本需lixnux下运行');
        }

        if( $this->read_memory_rows > $this->split_file_rows || $this->read_memory_rows > 10000)
        {
            $this->returnMsg('读入内存的行数不能大于切割文件的行数，且不能大于10000');
        }
        $this->db = m_config::get_dc()->db();
    }

    public function init()
    {
        //创建栏目表
        $this->create_table('category');
        echo '------------------开始执行切割----------------------'.PHP_EOL;
        $this->split();
        echo '-----------切割完毕并开始多进程读取入库 ---------------'.PHP_EOL;
        $this->get_files_call_insert();
        echo '------------------入库完毕---------------------------'.PHP_EOL;
        if( $this->flag_category_aset  == 'asset')
        {
            echo '------------------- 开始分组--------------------------'.PHP_EOL;
            $this->incre_db_group();
            echo '--------------------分组完毕----------------------------'.PHP_EOL;
        }
    }

    //asset_source_incre分组到asset_source_incre_group
    public function incre_db_group()
    {
        //全部
        $sql = "truncate nns_asset_source_incre_group";
        nl_execute_by_db($sql,$this->db);

        $sql = "insert into nns_asset_source_incre_group
                            select * from nns_asset_source_incre  group by nns_assetid";
        nl_execute_by_db($sql,$this->db);
//        //每次一千条进行分组
//        for( $insert_num = 0;true;$insert_num+=100 )
//        {
//            //先查询
////            $sql_query = "select * from nns_asset_source_incre
////                            where nns_assetid not in
////                            (
////                               select nns_assetid  from nns_asset_source_incre_group
////                            )
////                            group by nns_assetid
////                            limit {$insert_num},100";
////
////            $re_query = nl_query_by_db($sql_query,$this->db);
////            if( !is_array($re_query))  break;
////
////            $sql = "insert into nns_asset_source_incre_group
////                            select * from nns_asset_source_incre
////                            where nns_assetid not in
////                            (
////                               select nns_assetid  from nns_asset_source_incre_group
////                            )
////                            group by nns_assetid
////                            limit {$insert_num},100";
////            nl_execute_by_db($sql,$this->db);
//        }
//        return;
    }

    //切割前置
    private function before_split()
    {
        $ftp_url_info = parse_url($this->ftp_url);
        //ftp登录校验
        $this->my_ftp_login($ftp_url_info);

        $this->ftp_file_path = $ftp_url_info['path'];

        $arr_file = pathinfo($ftp_url_info['path']);
        $this->file_name = isset($arr_file['filename'])?$arr_file['filename']:'';
        $this->file_extension = (isset($arr_file['extension']) && $arr_file['extension'] == 'csv' )?$arr_file['extension']:$this->returnMsg('文件类型应为csv');
        $this->dir_store_split_file ='/tmp/incre_csv_import_db/'.np_guid_rand();
    }

    /**
     * 切割csv
     * @return [type] [description]
     */
    private  function split()
    {
        //切割前置操作
        $this->before_split();
        //未创建的路径确保没有其他文件打扰读取
        if( is_dir($this->dir_store_split_file) )
        {
            $this->returnMsg('请指定一个未创建的路径');
        }

        mkdir($this->dir_store_split_file,0777,true);

        //下载csv文件到此路径
        $file_local = $this->dir_store_split_file.'/'.$this->file_name.'.'.$this->file_extension;
        //去掉路径前的/
        $file_remote = trim($this->ftp_file_path,'/');

        //判定文件是否存在
        $file_is_exist = ftp_size($this->ftp_conn,$file_remote);

        if( $file_is_exist == -1)
        {
            $this->returnMsg('指定路径的csv文件不存在');
        }
        else if( empty($file_is_exist) )
        {
            $this->returnMsg('文件内容为空');
        }

        echo '----------------------下载csv中--------------------'.PHP_EOL;
        $ftp_download = ftp_get($this->ftp_conn,$file_local,$file_remote,FTP_BINARY);

        if( !$ftp_download )
        {
            $this->returnMsg('下载csv文件失败');
        }
        echo '----------------------下载成功---------------------'.PHP_EOL;


        echo '---------------------开始切割文件-------------------'.PHP_EOL;
        $m_split_file = new m_split_file($file_local,$this->file_extension,$this->dir_store_split_file,$this->dir_store_split_file,1,$this->split_file_rows,$this->prefix_splited_file);


        $file_split = $m_split_file->run();

        if( $file_split['ret'] !=0 )
        {
            $this->returnMsg('切割文件失败');
        }
        echo '--------------------- 切割文件成功------------------'.PHP_EOL;
    }

    //ftp登录
    private function my_ftp_login($ftp_url_info)
    {
        if( !isset($ftp_url_info['user']) || !isset($ftp_url_info['pass']) || !isset($ftp_url_info['path']) || !isset($ftp_url_info['host']))
        {
            $this->returnMsg('ftp链接不正确');
        }

        $this->ftp_conn = ftp_connect($ftp_url_info['host']);
        if( !$this->ftp_conn )
        {
            $this->returnMsg('无法连接ftp服务');
        }

        $ftp_login_status = ftp_login($this->ftp_conn,$ftp_url_info['user'],$ftp_url_info['pass']);
        if( !$ftp_login_status )
        {
            $this->returnMsg('登录信息不正确');
        }
    }


    //读取切割文件并调用入库
    public function get_files_call_insert()
    {
        if( !is_dir($this->dir_store_split_file) )
        {
            $this->returnMsg('切割保存路径不存在');
        }

        //扫描文件
        $arr_files = m_file::get_files($this->dir_store_split_file);
        if( !is_array($arr_files) || empty($arr_files) )
        {
            $this->returnMsg('指定目录无切割文件');
        }

        $str_exe = evn::get("php_execute_path") . ' -f ' . __FILE__;

        //传递$asset_column_order数组，为空则为首个文件（带字段），不为空为不带字段的数据文件
        foreach( $arr_files as $k=>$file )
        {
            $arr_pathinfo = pathinfo($file);
            if( !isset($arr_pathinfo['extension']) || $arr_pathinfo['extension'] !== 'csv' )
            {
                continue;
            }


            if( $k == 0 )
            {
                //首个文件首进程运行,获取排列的字段数组，然后传给后台运行的进程
                $this->read_split_insert_db($file);
            }
            else
            {
                $str = implode(',',$this->asset_column_order);
                $str_order = "{$str_exe}  {$file}  {$str}";
                sleep(0.1);
                exec($str_order,$output,$status);
            }
        }
    }


    //拼接sql时先转义后，后添加单引号
    private function addslashes_and_quotes($str) {
        return sprintf("'%s'", addslashes($str));
    }

    //读取入库
    public function read_split_insert_db($file_csv)
    {
        $arr_pathinfo = pathinfo($file_csv);
        if( empty($arr_pathinfo['extension']) || $arr_pathinfo['extension'] !== 'csv')
        {
            $this->returnMsg('文件格式不是csv');
        }

        $file_res = fopen($file_csv,'r');
        $file_lock = flock($file_res,LOCK_EX);
        if( !$file_lock )
        {
            $this->returnMsg('不能锁定文件');
        }

        $line_num = 1;

        //栏目插入
        $sql_category_head= "INSERT INTO `nns_asset_category_incre` (`nns_id`, `nns_parent_id`, `nns_name`, `nns_deep`, `nns_modify_time`, `nns_extra_param_one`, `nns_extra_param_two`) values ";

        //非首个文件进入时，直接设置头部
        if( !empty($this->asset_column_order) )
        {
            $sql_asset_head = "insert into nns_asset_source_incre(".implode(',',$this->asset_column_order).") values  ";
            $sql_asset_head_copy  = $sql_asset_head;
        }

        while( !feof($file_res) )
        {
            $arr_line = fgetcsv($file_res);
            if( !is_array($arr_line) ||empty( $arr_line ) )
            {
                continue;
            }

            foreach( $arr_line as $index=>$line )
            {
                //转换字符编码
                $arr_line[$index] = iconv(mb_detect_encoding($line, array('GBK', 'UTF-8', 'UTF-16LE', 'UTF-16BE', 'ISO-8859-1')), 'UTF-8',$line );
            }

            //line_num 1  标准的catogory  |  2  有疑问的category
            if( $line_num == 1 )
            {
                //判断是asset还是category
                $this->flag_category_aset = (count($arr_line) == 1 || count($arr_line) == 2 )?'category':'asset';
            }

            if( $this->flag_category_aset == 'category')
            {

                //再次判断栏目的数据列数，有大于等于两列的则用,拼接
                if( count($arr_line) >= 2 )
                {
                    $str_category = implode(',',$arr_line);
                    $arr_category_line = explode('|',$str_category);
                }
                else{
                    $arr_category_line = explode('|',$arr_line[0]);
                }


                $sql_new = '('.implode(',',array_map(array($this,'addslashes_and_quotes'),$arr_category_line)).'),';
                $sql_category_head .= $sql_new;

                if( $line_num >= $this->read_memory_rows )
                {
                    //500条记录作为一条insert语句
                    $re = nl_execute_by_db(trim($sql_category_head,','),$this->db);
                    if( empty($re)){
                        echo $sql_category_head;
                    }
                    $line_num = 1;
                    $sql_category_head= "insert into `nns_asset_category_incre` values ";
                }
            }
            else
            {
                if( $line_num == 1 && empty($this->asset_column_order) )
                {
                    //asset类型，第一行为字段信息，验证
                    $arr_column_org = array_keys($this->asset_column_map);

                    if( count($arr_column_org) != count($this->asset_column_map) )
                    {
                        $this->returnMsg('字段数不一致');
                    }

                    foreach( $arr_line as $column )
                    {
                        if( !in_array($column,$arr_column_org) )
                        {
                            $this->returnMsg("csv中{$column}字段非法");
                        }
                        $this->asset_column_order[] = $this->asset_column_map[$column];
                    }

                    $sql_asset_head = "insert into nns_asset_source_incre(".implode(',',$this->asset_column_order).") values  ";
                    $sql_asset_head_copy = $sql_asset_head;

                    //创建数据库  asset_source_incre
                    $this->create_table('asset');
                }
                else
                {
                    //asset媒资数据
                    $sql_new = '('.implode(',',array_map(array($this,'addslashes_and_quotes'),$arr_line)).'),';
                    $sql_asset_head_copy.= $sql_new;
                    //内存占用约为12%
                    if( $line_num >= $this->read_memory_rows )
                    {
                        nl_execute_by_db(trim($sql_asset_head_copy,','),$this->db);
                        $line_num = 1;
                        $sql_asset_head_copy = $sql_asset_head;
                    }
                }
            }
            $line_num ++;
        }
        //处理最后的insert语句
        if( $this->flag_category_aset == 'category')
        {
            nl_execute_by_db(trim($sql_category_head,','),$this->db);
        }
        else
        {
            nl_execute_by_db(trim($sql_asset_head_copy,','),$this->db);
        }
    }


    //创建数据表
    private function create_table($type)
    {
        if( !in_array($type,['category','asset']) )
        {
            $this->returnMsg('类型不正确');
        }

        if( $type == 'asset')
        {
            $arr_table = ['nns_asset_source_incre','nns_asset_source_incre_group'];
            //媒资sql
            foreach( $arr_table as $table )
            {
                $sqls[$table] = "
                            CREATE TABLE `{$table}` (
                              `nns_id` bigint(255) NOT NULL,
                              `nns_assetid` varchar(255) DEFAULT NULL,
                              `nns_category_id` varchar(255) DEFAULT NULL,
                              `nns_name` varchar(255) DEFAULT NULL,
                              `nns_director` varchar(255) DEFAULT NULL,
                              `nns_actor` varchar(255) DEFAULT NULL,
                              `nns_image` varchar(255) DEFAULT NULL,
                              `category_type` varchar(255) DEFAULT NULL,
                              `nns_index` tinyint(4) DEFAULT NULL,
                              `nns_country` varchar(255) DEFAULT NULL,
                              `nns_area` varchar(255) DEFAULT NULL,
                              `nns_year` varchar(255) DEFAULT NULL,
                              `nns_type` tinyint(4) DEFAULT NULL,
                              `nns_state` tinyint(2) DEFAULT 0 COMMENT '0-初始状态  |  1- 成功注入资源库 |  2-注入资源库失败  | 3-成功上下线  | 4-上下线失败',
                              `nns_series_import_state` tinyint(2) DEFAULT 0  COMMENT '主媒资注入状态  0-初始 | 1-成功注入 | 2-注入失败',
                              `nns_index_import_state` tinyint(2) DEFAULT 0 COMMENT '分集注入状态  0-初始 | 1-成功注入 | 2-注入失败',
                              `nns_media_import_state` tinyint(2) DEFAULT 0 COMMENT '片源注入状态  0-初始 | 1-成功注入 | 2-注入失败',
                              `nns_series_id` VARCHAR(255)  DEFAULT NULL COMMENT '主媒资注入id',
                              PRIMARY KEY (`nns_id`),
                              KEY `nns_assetid` (`nns_assetid`) USING BTREE
                            ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
            }
        }
        else
        {
            //栏目sql
            $sqls['nns_asset_category_incre'] =
                "CREATE TABLE `nns_asset_category_incre` (
                          `nns_id` bigint(255) NOT NULL,
                          `nns_parent_id` varchar(255) DEFAULT NULL,
                          `nns_name` varchar(255) DEFAULT NULL,
                          `nns_deep` tinyint(4) DEFAULT NULL,
                          `nns_modify_time` varchar(255) DEFAULT NULL,
                          `nns_extra_param_one` varchar(255) DEFAULT NULL,
                          `nns_extra_param_two` varchar(255) DEFAULT NULL,
                          PRIMARY KEY (`nns_id`)
                        ) ENGINE=MyISAM DEFAULT CHARSET=utf8;";
        }

        nl_execute_by_db('use nn_bk',$this->db);
        foreach( $sqls as $table=>$sql )
        {
            //这么写待议,风险可能性？？？
            nl_execute_by_db("DROP TABLE IF EXISTS `{$table}`;",$this->db);
            $create_status = nl_execute_by_db($sql, $this->db);
            if( empty($create_status) )
            {
                $this->returnMsg("数据表{$table}创建失败");
            }
        }
    }

    //反馈信息
    public function returnMsg($message)
    {
        if( is_string($message) )
        {
            echo $message;
            exit;
        }
    }

    public function __destruct()
    {
        if( !empty($this->ftp_conn) )
        {
            ftp_close($this->ftp_conn);
            unset($this->db);
        }
    }
}

$incre_csv_import_db = new incre_csv_import_db();

if( !empty($argv[1]) && isset($argv[2]) && count($argv) == 3 )
{
    //设置asset_column_order
    $incre_csv_import_db->asset_column_order = explode(',',$argv[2]);
    //文件循环入库
    $incre_csv_import_db->read_split_insert_db($argv[1]);
}
else if( count($argv) == 2 && !empty($argv[1]) )
{
    //首次进入这个
    if( strpos($argv[1],'ftp://') !== 0 )
    {
        $incre_csv_import_db->returnMsg('需是ftp链接');
    }
    $incre_csv_import_db->ftp_url = $argv[1];
    //首次进来,切割并读取文件
    $incre_csv_import_db->init();
}
else if( count($argv) == 1)
{
    $incre_csv_import_db->returnMsg('没有ftp下载链接');
}
else
{
    $incre_csv_import_db->returnMsg('非法操作');
}





